Java 并发(二) - Executor 框架

Posted by LinYaoTian on 2018-09-06

任务执行

选择串行的方式执行任务,串行处理机制通常无法提高高吞吐率和快速响应性,于是我们可以显式地为任务创建线程,为每一个请求创建一个线程来执行任务,这样可以实现更高的响应性。

但是这样会带来很多问题:

  1. 线程的创建和销毁开销非常高
  2. 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程就会闲置。大量空闲的线程会占用许多内存,给 GC 带来很大的压力,而且大量线程在竞争 CPU 资源时还会产生其他性能开销。
  3. 可创建线程的数量存在一个限制。这个限制值受多个平台制约,包括 JVM 的启动参数,Thread 构造函数中请求栈的大小以及操作系统的限制。

总得来说,增加线程可以提高系统的吞吐率但是如果超出了这个范围,再创建更多的线程只会降低程序的速度,更严重会导致奔溃。

使用 Executor 框架

对于线程和任务,任务是一组逻辑工作单元,而线程是使任务异步执行的机制,Executor 框架可以将线程和任务协调起来。

Executor 基于生产者—消费者模型,它提供了一个标准的方法,将任务的提交和执行过程解耦,用 Runnable 来表示任务。提交任务的操作相当于生产者,执行任务的线程相当于消费者。

Executor 有两种实现方式:

  • 每线程每任务

    1
    2
    3
    4
    5
    public class ThreadPerTaskExecutor implements Executor {  
    public void execute(Runnable command) {
    new Thread(command).start();
    }
    }
  • 一个线程所有任务

    1
    2
    3
    4
    5
    public class WithinThreadExecutor implements Executor {  
    public void execute(Runnable command) {
    command.run();
    }
    }

线程池

线程池的优势:

  1. 通过重用现有的线程而不是创建新线程,可以减少创建和销毁线程的开销 。
  2. 当请求到来时,由于线程已经存在,可以减少等待时间,从而提高了响应性。

可以通过调用 Executors 中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool 创建一个定长的线程池, 每当提交一个任务就创建一个线程, 直到达到池的最大长度, 这时线程池会保持长度不再变化. (一任务一线程)

  • newCachedThreadPool 创建一个可缓存的线程池, 如果当前线程的长度超过了处理的需要时, 它可以灵活的回收空闲的线程, 当需求增加时, 它可以灵活的增加新的线程, 而并不会对池的长度做任何限制.。(缓存线程池)

  • newSingleThreadExecutor 创建一个单线程化executor, 它只创建唯一的工作者线程来执行任务, 如果这个线程异常结束, 会有另一个取代它, 但是任务会保存在一个 queue 中等待执行。(多任务一线程)

  • newScheduleThreadPool 创建一个定长的线程池, 而且支持定时的以及周期性执行任务, 类似 timer。(定时线程池)

可以看见上面的静态方法都几乎是通过 ThreadPoolExecutor 来实现的

参数含义:

  • corePoolSize:池中所保存的线程数,包括空闲线程
  • maximumPoolSize:线程池允许的最大线程数量
  • keepAliveTime:存活时间,当线程数大于核心线程数时,多出来的线程为空余线程,当空余线程在一定的时间内没有新任务到达执行,则终止该线程
  • unitkeepAliveTime参数的时间单位
  • workQueue:执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务,使用不同的Queue,能够得到不同的功能,例如 newFixThreadPool 使用的是 linkedBolckQueue,而 newCachedThreadPool 使用的是SynchronizedQueue

Executor 的生命周期:ExecutorService

从上面所述,我们可以看出 Executor通常是创建线程来执行任务,为了解决执行服务的生命周期问题,Executor扩展了 ExecutorService 接口,添加了生命周期的管理方法

1
2
3
4
5
6
7
8
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
....
}

ExecutorService 有三种状态: running(运行), shuting down(关闭), terminated(已终止)

shuting down(关闭)状态:

  • shutdown:将停止接受新的任务, 同时等待已经提交的任务完成, 包括尚未完成的任务
  • showdownNow:会启动一个强制的关闭过程, 尝试取消所有运行中的任务和排在队列中尚未开始的任务,并把排队中尚未开始的任务返回。

对于关闭后提交到 ExecutorService 中的任务, 会被(拒接执行处理器)rejected execution handler 处理,它会抛弃任务,或者使得 execute 方法抛出一个未检查的 RejectedExecutionException

terminated(已终止)状态:

等所有任务都完成之后,进入terminated 状态, 可以调用 awaitTermination 等待 ExecutorService 到达终止状态, 也可以轮询检查 isTerminated 判断是否终止。通常 shutdown 会紧随 awaitTermination 之后, 这样可以产生同步地关闭ExecutorService 的效果。

延迟任务和周期任务

延迟任务:在 100ms 后执行任务

周期任务:没 100ms 执行一次任务

一般来说,Timer 类用于执行延迟任务和周期任务,但是 Timer 有以下两个问题:

  1. 只会创建一个线程来执行所有 task, 如果一个 task 非常耗时, 会导致其他的 task 的实效准确性出问题。
  2. Timer 线程并不捕获异常,对于一些未检查异常(RuntimeException)抛出,Timer 线程会被终止。

因此我们一般不使用 Timer,可以使用 DelayQueue 来实现自己的调度服务, 它使 BlockingQueue 的一种实现,并为ScheduledThreadPoolExecutor 提供了调度功能。 其内部包含了一个 Delayed 对象的容器,Delayed 是一种延迟时间对象, 只有元素过期后,它才会让你执行 take 获取元素(这个元素实现了 Delayed 接口)。

Callable 对比 Runnable

1
2
3
4
5
public class ThreadPerTaskExecutor implements Executor {  
public void execute(Runnable command) {
new Thread(command).start();
}
}

Exceutor 框架使用 Runnable 作为最基本的任务形式,但是 Runnable 有一种很大的局限性,它不能返回一个值或抛出一个受检查的异常。

Callable 是一种更好的任务形式,它能返回一个值或者抛出一个异常。

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

Executor 执行任务时有 4 个生命周期阶段:创建,提交,开始,完成。
对于已提交但尚未开始的任务,调用 shutdownNow() 可以取消并返回这些任务;对于已经开始的任务,只有当他们能响应中断时,才能取消。

Callable 和 Future 结合使用返回结果

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

get() 方法的行为将取决于任务的状态,如果任务已经完成,那么 get() 会立即返回结果或者抛出一个 Exception,如果任务没有完成,那么 get() 将阻塞并直到任务完成。

将一个 Runnable 或者 Callable 任务传递给 ExecutorServicesubmit 方法,将返回一个 Future 用于获得任务的执行结果或者取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final ExecutorService executor = Executors.newCachedThreadPool();

void renderPage(String source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
//下载图片
Callable<List<ImageData>> task = new Callable<List<ImageData>>() {

@Override
public List<ImageData> call() throws Exception {
List<ImageData> list = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos) {
list.add(imageInfo.download());
}
}

};

//处理图片
Future<List<ImageData>> future = executor.submit(task);//将一个Runnable或者Callable任务传递给ExecutorService的submit方法,将返回一个Future用于获得任务的执行结果或者取消任务。
renderText(source);

try {
List<ImageData> list = future.get();//get()方法会一直阻塞,直到callable的任务全部完成。
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消任务
future.cancel(true);
} catch (ExecutionException e) {
e.printStackTrace();
}
}

上面程序还存在一个问题不是很好,就是下载图片和处理图片的速率的协调问题,如果处理图片的速度远远高于下载图片的速度,那跟串行执行的效率可能相差不是很大。

CompletionService 和 BlockingQueue

CompletionService 用来将 ExecutorBlockingQueue 进行结合, 将 Callable 任务提交给它执行, 然后使用类似队列中的 takepoll 在结果完整时获得这个结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test {
private final ExecutorService executor = Executors.newCachedThreadPool();

void renderPage(String source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
CompletionService<ImageData> service = new ExecutorCompletionService<ImageData>(
executor);

for (final ImageInfo imageInfo : imageInfos) {
service.submit(new Callable<ImageData>() {
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}

renderText(source);

for (int i = 0; i < imageInfos.size(); i++) {
Future<ImageData> f;
try {
f = service.take();
ImageData imageData = f.get();
renderImage(imageData);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}

上面这个改进相对于前一个,在两个方面提高了性能:

  1. 为每一个图片的下载都创建一个独立任务,把串行下载改为了并行下载,减少了下载图像的总时间
  2. 通过 CompletionService 来即时获取每一个下载任务的 Future,并通过该 Future 来获取结果进行处理,有更好的并发性。

取消和关闭

Java 没有提供任何机制来安全地终止线程,但是它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

通过取消标志终止线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PrimeGenerator implements Runnable {  
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
// 每次执行循环之前都检查标志
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}

public void cancel() {
this.cancelled = true;
}

public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}

客户端执行取消

1
2
3
4
5
6
7
PrimeGenerator g = new PrimeGenerator();  
new Thread(g).start();
try {
Thread.sleep(10);
} finally {
g.cancel();
}

但是这里会有一个问题,这种标志,在生产者消费者模型中,当队列已被填满,生产者 put 时会被阻塞,如果此时消费者希望取消生产者任务,于是它调用 cancel() 来改变取消标志,但是生产者无法识别这个标志,因为它已经被阻塞了,它无法从 put 的阻塞状态恢复过来。

通过中断机制来终止线程

Thread 中的中断方法:

1
2
3
public void interrupt();
public boolean isInterrupted();
public static boolean interrupted();

interrupted():会清除当前线程的中断状态,除非你想屏蔽掉这个中断,否则你要对它处理:抛出 InterruptedException 或者再次调用 interrupt 恢复中断。

对中断的理解应该是:它并不会真正中断一个正在运行的线程; 它仅仅只是发送中断请求(这一点很重要)。

对于上面取消标志的生产者消费者模型,如果使用中断标志就能够解决为:在每一次迭代循环时,有两个位置可以检测出中断,阻塞的 put 中和循环开始处查询中断状态时,当检测到中断时,我们对它进行处理就可以实现任务的取消了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PrimeProducer extends Thread {  
private final BlockingQueue<BigInteger> queue;

public PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
queue.put(p.nextProbablePrime());
}
} catch (InterruptedException e) {

}
}

public void cancel() {
interrupt();
}
}

通过 Future 来实现取消

ExecutorService.submit() 将返回一个 Future 来管理任务,Future 拥有一个 cancle 方法。

1
boolean cancel(boolean mayInterruptIfRunning);

如果 mayInterruptIfRunningtrue,并且任务当前正在某一个线程运行,那么这个线程能够被中断,如果线程已经结束,那么执行取消操作也不会带来任何影响; 如果为 false,那么意味着“如果任务还没有启动,则不要运行它”。

当 Future.get 抛出 InterruptedException 或者 TimeoutException 时,如果你知道不再需要结果,那么就可以调用Future.cancle 来取消任务。

关闭服务

如果一个方法需要处理一批任务, 并在所有任务结束前不返回, 那么它可以通过使用私有的 Executor 来简化服务的生命周期, 其中 Executor 的寿命限定在该方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {  
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false); // 使用Atmic去掉volatile是因为内部Runnable访问hasNewMail标识, 那么必须是final的, 这样才能避免被修改
try {
for (final String host : hosts) {
exec.execute(new Runnable() {
@Override
public void run() {
if (checkMail(host)) {
hasNewMail.set(true);
}
}

});
}
} finally {
exec.shutdown(); //关闭
exec.awaitTermination(timeout, unit); //等待结束
}
return hasNewMail.get();
}

ExecutorshutdownNow 方法强行关闭一个 ExecutorService 时, 它试图取消正在进行的任务, 并返回那些已经提交但是尚未开始的任务。然而,对于那些已经开始但尚未结束的任务,我们用常规的方法无法找出,但是可以通过一个容器来保存这些已开始但尚未结束的任务。

线程池的配置与调优

线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能导致死锁。在单线程的 Executor 中,第二个任务停留在工作队列中,等待第一个任务的完成,之后到第二个任务开始执行,而第一个任务又需要第二个任务完成的结果,因此这样就导致死锁。只要线程池中的任务需要无限期地等待一些必须由池中其它任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,除非线程池足够大,否则将发生线程饥饿死锁。

如果阻塞时间过长,即使不死锁,响应性也会很差。这种情况,我们要限定等待的时间,不要无限的等。大多数平台类库中的阻塞方法, 都同时有限时的和非限时的两个版本, 比如 Thread.join , BlockingQueue.put, CountDownLatch.await, Selector.select。如果出现超等待, 你可以把任务标识为失败, 中止它或者把它重新放回队列, 准备之后执行。

ThreadPoolExecutor

  1. 核心池大小, 最大池大小, 存活时间共同管理着线程的创建与销毁。即使没有任务执行, 池的大小也等于核心池的大小, 并且直到工作队列充满前, 池都不会创建更多的线程。最大池大小是可以同时活动的线程的上限, 如果一个线程已经闲置的时间超过了存活时间, 它将成为一个被收回的候选者, 如果当前的池的大小超过了核心池的大小, 线程会终止该候选者。
  2. workQueue 管理这 Executor 的实现方式。

总结来说:各种不同类型的线程池,即使通过核心池大小、最大池大小、 存活时间和、实现队列来实现各自类型的功能。

  • newFixedThreadPool:工厂为请求的池设置了核心池的大小和最大池的大小, 而且它永远不会超时。
  • newCachedThreadPool:工厂将最大池的大小设置为 Integer.MAX_VALUE, 核心池的大小设置为0, 超时时间设置为1分钟, 这样创建出来的无限扩大的线程池会在需求量减少的情况下减少线程数量。
  • ThreadPoolExecutor 提供一个 BlockingQueue 来保存等待执行的任务。基本的排队方法有3种,无界队列,有界队列和同步移交(Synchronous)。
  • newFixedThreadPool 和 newSingleThreadExecutor:默认使用的是一个无界的 LinkedBlockingQueue, 如果所有的工作者线程都处于忙碌状态, 任务将会在队列中等候, 如果任务持续地到达, 超过了它被执行的速度, 队列会无限地增加.
  • SynchronousQueue:当你需要一个庞大,且线程无限的线程池,可以使用 SynchronousQueue,它完全绕开队列, 将任务直接从生产者移交到工作者线程, 因为 SynchronousQueue 并不是一个真正的队列, 而是一种管理直接在线程间移交信息的机制。 只有当池的大小是无限的, 或者可以接受任务被拒绝, SynchronousQueue 才是一个有实际价值的选择, newCachedThreadPool 工厂就是用了 SynchronousQueue。

饱和策略

当有界队列被填满后,饱和策略就游泳了。ThreadPoolExecutor 的饱和策略可以通过调用 setRejectedExecutionHandler来修改。JDK 提供了几种不同的实现,每一种实现有不同的饱和策略:AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy

  1. 中止策略(AbortPolicy):默认的饱和策略,会抛出 RejectedExecutionException ,调用者可以捕获这个隐藏然后编写满足自己需求的处理代码。
  2. 抛弃策略(DiscardPolicy):当最新提交的任务不能进入队列等待执行时, 遗弃(discard)策略会默认放弃这个任务 。
  3. 遗弃最旧策略(DiscardOldestPolicy):选择丢弃的任务是本应该接下来就应该执行的任务, 该策略还会尝试去重新提交新任务。(该策略最好不要和优先级队列一起使用) 。
  4. 调用者运行策略(CallerRunsPolicy):既不会丢弃哪个任务, 也不会抛出任何异常. 它会把一些任务退回到调用者那里, 从此缓解新任务流. 他不会在池线程中执行最新提交的任务, 但是他会在一个调用了 execute 的线程中执行。当工作队列充满后, 并没有预置的饱和策略来阻塞 execute

当工作队列充满后, 并没有预置的饱和策略来阻塞 execute 。但是,使用 Semaphore 信号量可以实现这个效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BoundedExecutor {  
private final Executor exec;
private final Semaphore semahpore;

public BoundedExecutor(Executor exec, int bound) {
super();
this.exec = exec;
this.semahpore = new Semaphore(bound);
}

public void submitTask(final Runnable command) throws InterruptedException {
semahpore.acquire(); //请求资源
try {
exec.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semahpore.release();
}
}
});
} catch (RejectedExecutionException e) {
semahpore.release();
}
}
}

定制 ThreadPoolExecutor

大多数通过构造函数传给 ThreadPoolExecutor 的参数(核心池大小, 最大池大小, 活动时间, 拒绝处理器) 都可以在创建之后通过 setter 进行设置。如果 Executors 是通过 Executors 中的某个工厂方法创建(newSingleThreadExecutor 除外),那么可以将结果的类型转换为 ThreadPoolExecutor 以访问设置器(setter):

1
2
3
4
ExecutorService executor = Executors.newCachedThreadPool();
if(executor instanceof ThreadPoolExecutor){
((ThreadPoolExecutor) executor).setCorePoolSize(10);
}

拓展 ThreadPoolExecutor

ThreadPoolExecutor 提供了几个可以在子类化中改写的方法(afterExecute, beforeExecute, terminated),这些方法可以用于拓展 ThreadPoolExecutor 的行为。

  1. 在执行任务的线程将调用 beforeExecuteafterExecute等方法。
  2. 无论任务是正常地从 run 返回, 还是抛出一个异常, afterExecute 都会被调用, 如果抛出一个 Error则不会。
  3. 如果 beforeExecute 抛出一个 RuntimeException , 任务不会被执行, afterExecute 也不会被调用。
  4. terminated 会在线程池完成关闭时调用, 也就是当所有任务已完成并且所有工作者线程也已经关闭后。terminated 可以用来释放 Executor 在生命周期中分配的一些资源, 还可以发出通知, 记录日志或者完成统计信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TimingThreadPool extends ThreadPoolExecutor {  
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTask = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
totalTime.addAndGet(taskTime);
numTask.incrementAndGet();
} finally {
super.afterExecute(r, t);
}
}

@Override
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTask.get()));
}finally {
super.terminated();
}
}
}

并行实现递归算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//树节点的串行递归
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
for (Node<T> node : nodes) {
results.add(node.compute());
sequentialRecursive(node.getChildren(), results);
}
}

//树节点的并行递归
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (final Node<T> node : nodes) {
exec.execute(new Runnable() {
@Override
public void run() {
results.add(node.compute());
}
});
parallelRecursive(exec, nodes, results);
}
}

public <T> Collection<T> getParallelResult(List<Node<T>> nodes) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}